package com.ruyuan.little.project.rocketmq.api.order.listener;


import com.alibaba.fastjson.JSON;
import com.ruyuan.little.project.common.dto.CommonResponse;
import com.ruyuan.little.project.common.enums.ErrorCodeEnum;
import com.ruyuan.little.project.common.enums.LittleProjectTypeEnum;
import com.ruyuan.little.project.redis.api.RedisApi;
import com.ruyuan.little.project.rocketmq.api.order.dto.OrderInfoDTO;
import com.ruyuan.little.project.rocketmq.api.order.service.OrderService;
import com.ruyuan.little.project.rocketmq.common.constants.RedisKeyConstant;
import org.apache.dubbo.config.annotation.Reference;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.common.message.MessageExt;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;

import javax.annotation.Resource;
import java.nio.charset.StandardCharsets;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.TimeUnit;

/**
 * 订单延迟监
 * @author 强军
 */
@Component
public class OrderDelayMessageListener implements MessageListenerConcurrently {

    private static final Logger LOGGER = LoggerFactory.getLogger(OrderDelayMessageListener.class);

    @Resource
    private OrderService orderService;


    /**
     * redis dubbo服务
     */
    @Reference(version = "1.0.0",
            interfaceClass = RedisApi.class,
            cluster = "failfast",check = false)
    private RedisApi redisApi;

    /**
     * 延迟消息：订单30分钟未支付，自动取消的业务
     *
     * @param msgs
     * @param consumeConcurrentlyContext
     * @return
     */
    @Override
    public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext consumeConcurrentlyContext) {


        for (MessageExt msg : msgs) {

            String body = new String(msg.getBody(), StandardCharsets.UTF_8);
            OrderInfoDTO orderInfoDTO = JSON.parseObject(body, OrderInfoDTO.class);
            String orderNo = orderInfoDTO.getOrderNo();
            String phoneNumber = orderInfoDTO.getPhoneNumber();
            LOGGER.info("received order delay message orderNo:{}", orderNo);

            try {
                CommonResponse<Boolean> commonResponse = redisApi.lock(RedisKeyConstant.ORDER_LOCK_KEY_PREFIX + orderNo,
                        orderNo, 10L, TimeUnit.SECONDS, phoneNumber, LittleProjectTypeEnum.ROCKETMQ);
                if (Objects.equals(commonResponse.getCode(), ErrorCodeEnum.SUCCESS.getCode()) && Objects.equals(commonResponse.getData(), Boolean.TRUE)) {
                    // 修改订单状态
                    try {
                        orderService.cancelOrder(orderNo, phoneNumber);
                    } catch (Exception e) {
                        LOGGER.info("cancel order fail error message:{}", e);
                    }
                }
            } finally {
                redisApi.unlock(RedisKeyConstant.ORDER_LOCK_KEY_PREFIX + orderNo,orderNo,phoneNumber,LittleProjectTypeEnum.ROCKETMQ);
            }
        }
        return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
    }
}
